/**
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
 * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 */

package org.apache.storm.utils;

import com.codahale.metrics.Meter;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ShellUtils {
    public static final Logger LOG = LoggerFactory.getLogger(ShellUtils.class);
    public static final OSType osType = getOSType();
    // Helper static vars for each platform
    public static final boolean WINDOWS = (osType == OSType.OS_TYPE_WIN);
    public static final boolean SOLARIS = (osType == OSType.OS_TYPE_SOLARIS);
    public static final boolean MAC = (osType == OSType.OS_TYPE_MAC);
    public static final boolean FREEBSD = (osType == OSType.OS_TYPE_FREEBSD);
    public static final boolean LINUX = (osType == OSType.OS_TYPE_LINUX);
    public static final boolean OTHER = (osType == OSType.OS_TYPE_OTHER);

    //Meter declared here can be registered by any daemon, and is currently used by Supervisor
    public static final Meter numShellExceptions = new Meter();

    /**
     * Token separator regex used to parse Shell tool outputs.
     */
    public static final String TOKEN_SEPARATOR_REGEX
        = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]";
    private final boolean redirectErrorStream; // merge stdout and stderr
    /**
     * Time after which the executing script would be timed out.
     */
    protected long timeOutInterval = 0L;
    private long interval;   // refresh interval in msec
    private long lastTime;   // last time the command was performed
    private Map<String, String> environment; // env for the command execution
    private File dir;
    private Process process; // sub process used to execute the command
    private int exitCode;
    /**
     * If or not script timed out.
     */
    private AtomicBoolean timedOut;
    /**
     * If or not script finished executing.
     */
    private volatile AtomicBoolean completed;

    public ShellUtils() {
        this(0L);
    }

    public ShellUtils(long interval) {
        this(interval, false);
    }

    /**
     * Creates a new shell utils instance.
     * @param interval the minimum duration to wait before re-executing the command
     */
    public ShellUtils(long interval, boolean redirectErrorStream) {
        this.interval = interval;
        this.lastTime = (interval < 0) ? 0 : -interval;
        this.redirectErrorStream = redirectErrorStream;
    }

    @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
    private static OSType getOSType() {
        String osName = System.getProperty("os.name");
        if (osName.startsWith("Windows")) {
            return OSType.OS_TYPE_WIN;
        } else if (osName.contains("SunOS") || osName.contains("Solaris")) {
            return OSType.OS_TYPE_SOLARIS;
        } else if (osName.contains("Mac")) {
            return OSType.OS_TYPE_MAC;
        } else if (osName.contains("FreeBSD")) {
            return OSType.OS_TYPE_FREEBSD;
        } else if (osName.startsWith("Linux")) {
            return OSType.OS_TYPE_LINUX;
        } else {
            // Some other form of Unix
            return OSType.OS_TYPE_OTHER;
        }
    }

    /**
     * a Unix command to get a given user's groups list. Windows is not supported.
     */
    public static String[] getGroupsForUserCommand(final String user) {
        if (WINDOWS) {
            throw new UnsupportedOperationException("Getting user groups is not supported on Windows");
        }
        //'groups username' command return is non-consistent across different unixes
        return new String[]{"id", "-Gn", user};
    }

    private static void joinThread(Thread t) {
        while (t.isAlive()) {
            try {
                t.join();
            } catch (InterruptedException ie) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn("Interrupted while joining on: " + t, ie);
                }
                t.interrupt(); // propagate interrupt
            }
        }
    }

    public static ShellLogHandler getLogHandler(Map<String, Object> topoConf) {
        if (topoConf == null) {
            throw new IllegalArgumentException("Config is required");
        }

        String logHandlerClassName = null;
        if (topoConf.containsKey(Config.TOPOLOGY_MULTILANG_LOG_HANDLER)) {
            try {
                logHandlerClassName = topoConf.get(Config.TOPOLOGY_MULTILANG_LOG_HANDLER).toString();
                return (ShellLogHandler) Class.forName(logHandlerClassName).newInstance();
            } catch (ClassCastException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
                throw new RuntimeException("Error loading ShellLogHandler " + logHandlerClassName, e);
            }
        }
        return new DefaultShellLogHandler();
    }

    /**
     * get the exit code.
     *
     * @return the exit code of the process
     */
    public int getExitCode() {
        return exitCode;
    }

    /**
     * set the environment for the command.
     *
     * @param env Mapping of environment variables
     */
    protected void setEnvironment(Map<String, String> env) {
        this.environment = env;
    }

    /**
     * set the working directory.
     *
     * @param dir The directory where the command would be executed
     */
    protected void setWorkingDirectory(File dir) {
        this.dir = dir;
    }

    /**
     * check to see if a command needs to be executed and execute if needed.
     */
    protected void run() throws IOException {
        if (lastTime + interval > System.currentTimeMillis()) {
            return;
        }
        exitCode = 0; // reset for next run
        try {
            runCommand();
        } catch (IOException e) {
            numShellExceptions.mark();
            throw e;
        }
    }

    /**
     * Run a command.
     */
    private void runCommand() throws IOException {
        ProcessBuilder builder = new ProcessBuilder(getExecString());
        Timer timeOutTimer = null;
        ShellTimeoutTimerTask timeoutTimerTask = null;
        timedOut = new AtomicBoolean(false);
        completed = new AtomicBoolean(false);

        if (environment != null) {
            builder.environment().putAll(this.environment);
        }
        if (dir != null) {
            builder.directory(this.dir);
        }

        builder.redirectErrorStream(redirectErrorStream);
        process = builder.start();

        if (timeOutInterval > 0) {
            timeOutTimer = new Timer("Shell command timeout");
            timeoutTimerTask = new ShellTimeoutTimerTask(this);
            //One time scheduling.
            timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
        }
        final BufferedReader errReader =
            new BufferedReader(new InputStreamReader(process
                                                         .getErrorStream()));
        BufferedReader inReader =
            new BufferedReader(new InputStreamReader(process
                                                         .getInputStream()));
        final StringBuffer errMsg = new StringBuffer();

        // read error and input streams as this would free up the buffers
        // free the error stream buffer
        Thread errThread = new Thread() {
            @Override
            public void run() {
                try {
                    String line = errReader.readLine();
                    while ((line != null) && !isInterrupted()) {
                        errMsg.append(line);
                        errMsg.append(System.getProperty("line.separator"));
                        line = errReader.readLine();
                    }
                } catch (IOException ioe) {
                    LOG.warn("Error reading the error stream", ioe);
                }
            }
        };
        try {
            errThread.start();
        } catch (IllegalStateException ise) {
            //ignore
        }
        try {
            parseExecResult(inReader); // parse the output
            // clear the input stream buffer
            String line = inReader.readLine();
            while (line != null) {
                line = inReader.readLine();
            }
            // wait for the process to finish and check the exit code
            exitCode = process.waitFor();
            // make sure that the error thread exits
            joinThread(errThread);
            completed.set(true);
            //the timeout thread handling
            //taken care in finally block
            if (exitCode != 0) {
                throw new ExitCodeException(exitCode, errMsg.toString());
            }
        } catch (InterruptedException ie) {
            throw new IOException(ie.toString());
        } finally {
            if (timeOutTimer != null) {
                timeOutTimer.cancel();
            }
            // close the input stream
            try {
                // JDK 7 tries to automatically drain the input streams for us
                // when the process exits, but since close is not synchronized,
                // it creates a race if we close the stream first and the same
                // fd is recycled.  the stream draining thread will attempt to
                // drain that fd!!  it may block, OOM, or cause bizarre behavior
                // see: https://bugs.openjdk.java.net/browse/JDK-8024521
                //      issue is fixed in build 7u60
                InputStream stdout = process.getInputStream();
                synchronized (stdout) {
                    inReader.close();
                }
            } catch (IOException ioe) {
                LOG.warn("Error while closing the input stream", ioe);
            }
            if (!completed.get()) {
                errThread.interrupt();
                joinThread(errThread);
            }
            try {
                InputStream stderr = process.getErrorStream();
                synchronized (stderr) {
                    errReader.close();
                }
            } catch (IOException ioe) {
                LOG.warn("Error while closing the error stream", ioe);
            }
            process.destroy();
            lastTime = System.currentTimeMillis();
        }
    }

    /**
     * return an array containing the command name & its parameters.
     */
    protected abstract String[] getExecString();

    /**
     * Parse the execution result.
     */
    protected abstract void parseExecResult(BufferedReader lines)
        throws IOException;

    /**
     * get the current sub-process executing the given command.
     *
     * @return process executing the command
     */
    public Process getProcess() {
        return process;
    }

    /**
     * To check if the passed script to shell command executor timed out or not.
     *
     * @return if the script timed out.
     */
    public boolean isTimedOut() {
        return timedOut.get();
    }

    /**
     * Set if the command has timed out.
     */
    private void setTimedOut() {
        this.timedOut.set(true);
    }

    /**
     * OSType detection.
     */
    @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
    public enum OSType {
        OS_TYPE_LINUX,
        OS_TYPE_WIN,
        OS_TYPE_SOLARIS,
        OS_TYPE_MAC,
        OS_TYPE_FREEBSD,
        OS_TYPE_OTHER
    }

    /**
     * This is an IOException with exit code added.
     */
    public static class ExitCodeException extends IOException {
        int exitCode;

        public ExitCodeException(int exitCode, String message) {
            super(message);
            this.exitCode = exitCode;
        }

        public int getExitCode() {
            return exitCode;
        }
    }

    /**
     * A simple shell command executor.
     *
     * <code>ShellCommandExecutor</code>should be used in cases where the output
     * of the command needs no explicit parsing and where the command, working directory and the environment remains unchanged. The output
     * of the command is stored as-is and is expected to be small.
     */
    public static class ShellCommandExecutor extends ShellUtils {

        private String[] command;
        private StringBuffer output;


        public ShellCommandExecutor(String[] execString) {
            this(execString, null);
        }

        public ShellCommandExecutor(String[] execString, File dir) {
            this(execString, dir, null);
        }

        public ShellCommandExecutor(String[] execString, File dir,
                                    Map<String, String> env) {
            this(execString, dir, env, 0L);
        }

        /**
         * Create a new instance of the ShellCommandExecutor to execute a command.
         *
         * @param execString The command to execute with arguments
         * @param dir        If not-null, specifies the directory which should be set as the current working directory for the command. If
         *                   null, the current working directory is not modified.
         * @param env        If not-null, environment of the command will include the key-value pairs specified in the map. If null, the
         *                   current environment is not modified.
         * @param timeout    Specifies the time in milliseconds, after which the command will be killed and the status marked as timedout.
         *                   If 0, the command will not be timed out.
         */
        public ShellCommandExecutor(String[] execString, File dir,
                                    Map<String, String> env, long timeout) {
            command = execString.clone();
            if (dir != null) {
                setWorkingDirectory(dir);
            }
            if (env != null) {
                setEnvironment(env);
            }
            timeOutInterval = timeout;
        }


        /**
         * Execute the shell command.
         */
        public void execute() throws IOException {
            this.run();
        }

        @Override
        public String[] getExecString() {
            return command;
        }

        @Override
        protected void parseExecResult(BufferedReader lines) throws IOException {
            output = new StringBuffer();
            char[] buf = new char[512];
            int read;
            while ((read = lines.read(buf, 0, buf.length)) > 0) {
                output.append(buf, 0, read);
            }
        }

        /**
         * Get the output of the shell command.
         */
        public String getOutput() {
            return (output == null) ? "" : output.toString();
        }

        /**
         * Returns the commands of this instance. Arguments with spaces in are presented with quotes round; other arguments are presented
         * raw
         *
         * @return a string representation of the object.
         */
        @Override
        public String toString() {
            StringBuilder builder = new StringBuilder();
            String[] args = getExecString();
            for (String s : args) {
                if (s.indexOf(' ') >= 0) {
                    builder.append('"').append(s).append('"');
                } else {
                    builder.append(s);
                }
                builder.append(' ');
            }
            return builder.toString();
        }
    }

    /**
     * Timer which is used to timeout scripts spawned off by shell.
     */
    private static class ShellTimeoutTimerTask extends TimerTask {

        private ShellUtils shell;

        ShellTimeoutTimerTask(ShellUtils shell) {
            this.shell = shell;
        }

        @Override
        public void run() {
            Process p = shell.getProcess();
            try {
                p.exitValue();
            } catch (Exception e) {
                //Process has not terminated.
                //So check if it has completed
                //if not just destroy it.
                if (p != null && !shell.completed.get()) {
                    shell.setTimedOut();
                    p.destroy();
                }
            }
        }
    }
}
